﻿using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace MyCRM.Infrastructure.Messaging
{
    public class RxMessageBus : IMessageBus
    {
        private readonly Subject<IMessage> _subject = new Subject<IMessage>();

        public void Send(IMessage message)
        {
            try
            {
				SendMessage(message);
            }
            catch (Exception e)
            {
                SendException(e, message);
            }
        }

		private void SendMessage(IMessage message)
		{
			_subject.OnNext(message);
		}
		private void SendException(Exception e, IMessage message)
		{
			RejectMessageWithException(message, e);
			throw e;
		}

    	public IDisposable Register<T>(Action<T> action) where T : IMessage
        {
            return _subject
                .OfType<T>()
                .Subscribe(action);
        }
    	public IDisposable RegisterWithScheduler<T>(Action<T> action, IScheduler scheduler) where T : IMessage
    	{
			return _subject
				.OfType<T>()
				.ObserveOn(scheduler)
				.Subscribe(action);
    	}
    	public IDisposable RegisterWithThrotteledExecution<T>(Action<T> action, int throttleTimeInMilliseconds) where T : IMessage
    	{
			return _subject
				.OfType<T>()
				.Throttle(new TimeSpan(0,0,0,0,throttleTimeInMilliseconds))
				.ObserveOn(SynchronizationContext.Current)
				.Subscribe(action);
    	}
		public IDisposable RegisterWithThrotteledAndDistinctExecution<T>(Action<T> action, int throttleTimeInMilliseconds, IEqualityComparer<T> comparer) where T : IMessage
		{
			return _subject
				.OfType<T>()
				.Throttle(new TimeSpan(0, 0, 0, 0, throttleTimeInMilliseconds))
				.DistinctUntilChanged(comparer)
				.ObserveOn(SynchronizationContext.Current)
				.Subscribe(action);
		}

    	public void RejectMessageWithException(IMessage message, Exception exeption)
    	{
			LogExeption(exeption, message);
			this.SendMessage(new ExceptionIsThrownEvent() { Message = message, Exception = exeption });
    	}

    	public void Merge(IMessageBus other)
		{
			_subject.Merge(other.Subject);
		}

    	public IObservable<IMessage> Subject
    	{
			get { return _subject; }
    	}

    	static void LogExeption(Exception error, IMessage message)
		{
			Trace.WriteLine(String.Format("Exception: {0} @ {1} ----------------", error.GetType(), message.GetType()));
			Trace.WriteLine(String.Format("Message: {0}", error.Message));
			Trace.WriteLine(String.Format("Stack: {0}", error.StackTrace));
			Trace.WriteLine("");
		}
    }
}